from pathlib import Path
from typing import List, Dict
import mimetypes
class UnstructuredDataPipeline:
"""Complete pipeline for processing varied document types."""
def __init__(self):
self.pdf_processor = PDFProcessor()
self.supported_types = {
'application/pdf': self.process_pdf,
'image/jpeg': self.process_image,
'image/png': self.process_image,
'text/plain': self.process_text,
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': self.process_docx
}
def process_document(self, file_path: str) -> Dict:
"""Route document to appropriate processor."""
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type not in self.supported_types:
raise ValueError(f"Unsupported file type: {mime_type}")
processor = self.supported_types[mime_type]
return processor(file_path)
def process_pdf(self, path: str) -> Dict:
"""Process PDF with quality checks."""
result = self.pdf_processor.process_pdf(path)
# Extract tables separately if present
tables = extract_tables_from_pdf(path)
if tables:
result['tables'] = tables
result['warnings'].append(f"Found {len(tables)} tables")
return result
def process_image(self, path: str) -> Dict:
"""Process image (OCR or caption based on content)."""
# Heuristic: If image is mostly text (screenshot), OCR
# If image is visual (photo, chart), caption
text = extract_image_text(path)
caption = generate_image_caption(path)
# Combine both for rich representation
return {
"text": text,
"caption": caption,
"method": "image_processing",
"confidence": 0.85
}
def process_batch(
self,
directory: str,
file_pattern: str = "*.*"
) -> List[Dict]:
"""Process all documents in directory."""
results = []
for file_path in Path(directory).glob(file_pattern):
try:
result = self.process_document(str(file_path))
result['source'] = str(file_path)
results.append(result)
except Exception as e:
logging.error(f"Failed to process {file_path}: {e}")
return results
# Production usage
pipeline = UnstructuredDataPipeline()
# Process entire document corpus
results = pipeline.process_batch("./documents/")
# Store in RAG system
for doc in results:
if doc['confidence'] > 0.7: # Quality threshold
store_in_vector_db(
content=doc['text'],
metadata={
'source': doc['source'],
'method': doc['method'],
'confidence': doc['confidence'],
'warnings': doc.get('warnings', [])
}
)